/*
 * Copyright Debezium Authors.
 *
 * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
 */
package io.debezium.embedded;

import static org.junit.Assert.fail;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.debezium.config.Configuration;
import io.debezium.data.SchemaUtil;
import io.debezium.data.VerifyRecord;
import io.debezium.data.VerifyRecord.RecordValueComparator;
import io.debezium.document.Array;
import io.debezium.document.ArrayReader;
import io.debezium.document.ArrayWriter;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.document.Value;
import io.debezium.embedded.EmbeddedEngine.CompletionCallback;
import io.debezium.embedded.EmbeddedEngine.CompletionResult;
import io.debezium.util.Clock;
import io.debezium.util.Collect;
import io.debezium.util.IoUtil;
import io.debezium.util.Iterators;
import io.debezium.util.Iterators.PreviewIterator;
import io.debezium.util.LoggingContext;
import io.debezium.util.LoggingContext.PreviousContext;
import io.debezium.util.Strings;
import io.debezium.util.Testing;
import io.debezium.util.Threads;
import io.debezium.util.Threads.TimeSince;

/**
 * A base class for classes the define integration tests for connectors that verify the output of a connector matches an
 * expected sequence of change event records. This base class provides a simple framework for running a connector given
 * a configuration and either recording or comparing the connector's output to expected results.
 *
 * <h2>Overview</h2>
 * One of the common and easiest ways to test a connector is to simply run it and compare the changes events output by the
 * connector with a sequence of expected change events. Manually writing tests to compare the results is tedious and error
 * prone, so an alternative is to run the test once and <i>record</i> the change events output by the connector so that
 * these change events can be used as expected results in subsequent runs of the same test.
 * <p>
 * This framework makes it possible to define tests that start a connector, record the change events output by the connector,
 * and compare these change events to expected results. Connectors are defined via properties files, and the expected
 * results are recorded to or read from files containing the sequence of change events serialized as an array of JSON documents.
 * The test can also define an optional environment properties file in which test-specific parameters can be specified.
 *
 * <h3>Variations in results</h3>
 * Some fields in the output will change every time the connector is run. For example, a change event may include the
 * timestamp at which the connector processed/generated the event, or it might include a transaction identifier that is
 * different with each run. The framework works around these issues by allowing the test to specify in the environment
 * parameters the fields in the change events that should be ignored when comparing actual change events to expected change
 * events.
 * <p>
 * Other fields may vary every time the database is started. For example, a MySQL server that is configured to use GTIDs will
 * generate a new {@code server_uuid} value when it is initialized, and this value is then used in all GTIDs generated by
 * that server instance. Since multiple builds will start up and initialize a new MySQL server (in a Docker container), the
 * GTID will vary with each build. The framework can handle this by allowing the expected results to contain variables in
 * the form of <code>${variableName1}</code>, and the framework will use values supplied during test setup to replace these
 * variables prior to parsing the JSON document for each expected change event. The framework allows the test setup to
 * supply a function that provides the replacement variable values given the connector configuration. So the MySQL connector
 * tests, for example, can connect to the MySQL server and return all of the system variables as the variable values;
 * the framework does all the rest.
 *
 * <h3>Stopping the connector</h3>
 * When the test framework has compared all of the generated change events to the expected change events and found no
 * discrepancies, the framework knows that the connector should generate no additional change events. If the connector does
 * generate another event, the test will fail. Otherwise, the framework will stop the connector since it is no longer needed.
 *
 * <h3>Connector restarts</h3>
 * Another requirement is to verify that a connector will resume where it left off when it is restarted following a failure
 * or graceful shutdown. This is normally a bit harder to do, but this framework makes it very easy. Simply insert in the
 * sequence of expected change events a <i>restart command</i>. The framework consumes each change event generated by the
 * connector one at a time, and for each it compares it to the expected change event. As the framework comes across a
 * <i>restart command</i> in the expected results, it knows to stop the connector (persisting the offsets in the last
 * change event processed by the connector) and to then restart it given those persisted offsets. A properly-written connector
 * will resume exactly where it left off, and the framework can once again begin comparing the resulting change events
 * to the next events in the expected results.
 *
 * <h2>Example</h2>
 * A simple test runs a connector and records/compares the connector's output to expected results:
 *
 * <pre>
 * &#64;Test
 * public void shouldCaptureChangesFromMySqlWithNoGtids() {
 *     runConnector(usingSpec("test1", "src/test/resources/expected/test1"));
 * }
 * </pre>
 *
 * The framework uses a <i>test specification</i> that encapsulates the connector configuration, the environment properties, how
 * to obtain the variable values, and where to find the results. This example creates a test specification with the name
 * {@code test1} and the connector properties file, the environment properties file, and the expected results file all defined
 * in a directory named {@code src/test/resources/expected/test1}. Once again, the test will write out the expected results
 * file if none exists; otherwise, it will read in the expected results and compare them to the events output by the connector.
 * <p>
 * The framework supports other variations, and the {@code specWith} method is a simple convenience method that replaces
 * explicitly building up the test specification by using lots of defaults. Consequently, the framework offers a great deal
 * of flexibility.
 * <p>
 * To see more detail about the actual and expected records, turn on Debezium's test debug mode by using this line
 * before the {@code runConnector} call:
 *
 * <pre>
 * Testing.Debug.enable();
 * </pre>
 *
 * @author Randall Hauch
 */
public abstract class ConnectorOutputTest {

    public static final String DEFAULT_CONNECTOR_PROPERTIES_FILENAME = "connector.properties";
    public static final String DEFAULT_ENV_PROPERTIES_FILENAME = "env.properties";
    public static final String DEFAULT_EXPECTED_RECORDS_FILENAME = "expected-records.json";

    public static final String ENV_CONNECTOR_TIMEOUT_IN_SECONDS = "-connector.timeout.in.seconds";
    public static final String ENV_IGNORE_FIELDS = "ignore.fields";

    /**
     * Before each test this class will delete the "target/connector-output" directory under the project in which the test is run.
     */
    public static final Path CONNECTOR_OUTPUT_PATH = Testing.Files.createTestingPath("connector-output");
    protected static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath("integration-test-connector-offsets.data")
            .toAbsolutePath();

    public static final String CONTROL_KEY = "connector";
    public static final String CONTROL_RESTART = "restart";
    public static final String CONTROL_STOP = "stop";
    public static final String CONTROL_END = "end";

    private static enum ExecutionResult {
        /**
         * The connector stopped after actual records did not match expected records.
         */
        ERROR,
        /**
         * The connector stopped after an unknown error or problem.
         */
        EXCEPTION,
        /**
         * The connector stopped as requested in the expected results.
         */
        STOPPED,
        /**
         * The connector stopped as requested in the expected results and is to be restarted.
         */
        RESTART_REQUESTED;
    }

    @FunctionalInterface
    public static interface TestData extends AutoCloseable {
        /**
         * Read the records that are expected by the test.
         *
         * @return the expected records, or null if there are none
         * @throws IOException if there is a problem reading the expected records
         */
        Iterator<Document> read() throws IOException;

        /**
         * Record the function that should be called with each of the actual records output by the connector. By default this
         * method does nothing.
         *
         * @param sourceRecord the JSON representation of the source record; never null
         */
        default void write(Document sourceRecord) {
            // do nothing
        }

        /**
         * If records are being recorded, then complete writing them and close all resources associated with the test data.
         * By default this method does nothing.
         *
         * @throws IOException if there is a problem writing the expected records
         */
        @Override
        default void close() throws IOException {
            // do nothing
        }
    }

    public static class TestSpecification {

        private final String name;
        private final Configuration config;
        private final Configuration env;
        private final Function<TestSpecification, TestData> dataSupplier;
        private final AvailableVariables variables;
        private final AtomicReference<TestData> cachedData = new AtomicReference<>();

        public TestSpecification(String name, Configuration config, Configuration env, Function<TestSpecification, TestData> dataSupplier,
                                 AvailableVariables variables) {
            this.name = name != null ? name : "";
            this.variables = (variables != null ? variables : AvailableVariables.empty()).and(builtInVariables());
            this.config = config != null ? config.withReplacedVariables(this.variables::variableForName) : Configuration.empty();
            this.env = env != null ? env.withReplacedVariables(this.variables::variableForName) : Configuration.empty();
            this.dataSupplier = dataSupplier != null ? dataSupplier : (spec) -> () -> Iterators.empty();
        }

        private AvailableVariables builtInVariables() {
            Map<String, String> builtIns = Collect.hashMapOf("dbz.test.name", name());
            System.getProperties().forEach((key, value) -> builtIns.put(key.toString(), value.toString()));
            return builtIns::get;
        }

        /**
         * Get the name of this test.
         *
         * @return the name of the test; never null
         */
        public String name() {
            return name;
        }

        /**
         * Get the connector's configuration as would normally be passed to Kafka Connect when deploying the connector.
         *
         * @return the configuration; never null
         */
        public Configuration config() {
            return config;
        }

        /**
         * Get the configuration of the test environment. Often no custom environment configuration properties are required,
         * but they can be used to supply configuration properties to the {@link EmbeddedEngine} and the
         * {@link JsonConverter}, {@link JsonSerializer} and {@link JsonDeserializer} instances used to read and write
         * the expected records, as well as the following:
         * <ul>
         * <li>{@value #ENV_CONNECTOR_TIMEOUT_IN_SECONDS} for the maximum time the connector should wait for records before
         * stopping</li>
         * </ul>
         *
         * @return the test environment configuration; never null
         */
        public Configuration environment() {
            return env;
        }

        /**
         * Get the data for this test. This method will be called once per execution.
         *
         * @return the test data; never null
         */
        public TestData testData() {
            if (cachedData.get() == null) {
                // No cached data at this instance, but handle in a thread-safe manner ...
                cachedData.compareAndSet(null, dataSupplier.apply(this));
            }
            return cachedData.get();
        }

        /**
         * Get the system, environmental, and other variables.
         *
         * @return the variables; never null
         */
        public AvailableVariables variables() {
            return variables;
        }

        /**
         * Create a new test specification that is a copy of this specification except with the given name.
         *
         * @param name the new name
         * @return the new test specification; never null
         */
        public TestSpecification withName(String name) {
            return new TestSpecification(name, config, env, dataSupplier, variables);
        }

        /**
         * Create a new test specification that is a copy of this specification except with the given configuration.
         *
         * @param config the new configuration
         * @return the new test specification; never null
         */
        public TestSpecification withConfiguration(Configuration config) {
            return new TestSpecification(name, config, env, dataSupplier, variables);
        }

        /**
         * Create a new test specification that is a copy of this specification except with the given configuration.
         *
         * @param file the configuration file; may not be null
         * @return the new test specification; never null
         */
        public TestSpecification withConfiguration(File file) {
            try {
                try (InputStream stream = new FileInputStream(file)) {
                    return withConfiguration(stream);
                }
            }
            catch (IOException e) {
                fail("Failed to read the configuration file '" + file + "': " + e.getMessage());
                return null;
            }
        }

        /**
         * Create a new test specification that is a copy of this specification except with the given configuration.
         *
         * @param stream the input stream to the configuration file; may not be null
         * @return the new test specification; never null
         */
        public TestSpecification withConfiguration(InputStream stream) {
            Properties props = new Properties();
            try {
                props.load(stream);
            }
            catch (IOException e) {
                fail("Failed to read the configuration file from the input stream': " + e.getMessage());
            }
            return withConfiguration(Configuration.from(props));
        }

        /**
         * Create a new test specification that is a copy of this specification except with the given environment.
         *
         * @param env the new environment
         * @return the new test specification; never null
         */
        public TestSpecification withEnvironment(Configuration env) {
            return new TestSpecification(name, config, env, dataSupplier, variables);
        }

        /**
         * Create a new test specification that is a copy of this specification except with the given environment.
         *
         * @param file the file containing the test environment; may not be null
         * @return the new test specification; never null
         */
        public TestSpecification withEnvironment(File file) {
            try {
                try (InputStream stream = new FileInputStream(file)) {
                    return withEnvironment(stream);
                }
            }
            catch (IOException e) {
                fail("Failed to read the environment file '" + file + "': " + e.getMessage());
                return null;
            }
        }

        /**
         * Create a new test specification that is a copy of this specification except with the given environment.
         *
         * @param stream the input stream to the file containing the test environment; may not be null
         * @return the new test specification; never null
         */
        public TestSpecification withEnvironment(InputStream stream) {
            Properties props = new Properties();
            try {
                props.load(stream);
            }
            catch (IOException e) {
                fail("Failed to read the environment input stream: " + e.getMessage());
                return null;
            }
            return withEnvironment(Configuration.from(props));
        }

        /**
         * Create a new test specification that is a copy of this specification except with the given data.
         *
         * @param dataSupplier the function that returns the new data when needed, given the TestSpecification that will own it;
         *            may be null if there is no test data
         * @return the new test specification; never null
         */
        public TestSpecification withTestData(Function<TestSpecification, TestData> dataSupplier) {
            return new TestSpecification(name, config, env, dataSupplier, variables);
        }

        /**
         * Create a new test specification that is a copy of this specification except with the test data read from
         * or able to be written to the given file.
         *
         * @param path the path to the file where the test data can be read or to which it is to be written; may not be null
         * @return the new test specification; never null
         */
        public TestSpecification withReadOrWriteTestData(Path path) {
            return withReadOrWriteTestData(path.toFile());
        }

        /**
         * Create a new test specification that is a copy of this specification except with the test data read from
         * or able to be written to the given file.
         *
         * @param file the file where the test data can be read or to which it is to be written; may not be null
         * @return the new test specification; never null
         */
        public TestSpecification withReadOrWriteTestData(File file) {
            if (file.exists()) {
                return readJsonTestData(file);
            }
            return writeJsonTestData(file);
        }

        /**
         * Create a new test specification that is a copy of this specification except with the test data read from
         * the specified file.
         *
         * @param path the path to the test data; may not be null
         * @return the new test specification; never null
         */
        public TestSpecification readJsonTestData(Path path) {
            return readJsonTestData(path.toFile());
        }

        /**
         * Create a new test specification that is a copy of this specification except with the test data read from
         * the specified file.
         *
         * @param file the test data; may not be null
         * @return the new test specification; never null
         */
        public TestSpecification readJsonTestData(File file) {
            return readJsonTestData(() -> new FileInputStream(file));
        }

        /**
         * Create a new test specification that is a copy of this specification except with the test data read from
         * the specified input stream. The supplied stream is read and variables substituted when the {@link #testData() test
         * data} is {@link TestData#read() read}.
         *
         * @param stream the supplier to the stream of test data; may not be null
         * @return the new test specification; never null
         */
        public TestSpecification readJsonTestData(InputStreamSupplier stream) {
            Function<TestSpecification, TestData> supplier = (spec) -> {
                // We need the variables from the TestSpecification that will own this data ...
                AvailableVariables variables = spec.variables();
                return () -> {
                    // Copy the content into a temporary file and replace variables line by line ...
                    File tmpFile = replaceVariables(stream.get(), variables);
                    Array arrayOfDocuments = ArrayReader.defaultReader().readArray(tmpFile);
                    return Iterators.readOnly(arrayOfDocuments.iterator(), (entry) -> {
                        Value value = entry.getValue();
                        return value.asDocument();
                    });
                };
            };
            return new TestSpecification(name, config, env, supplier, variables);
        }

        /**
         * Create a new test specification that is a copy of this specification except with a {@link TestData} that can capture
         * the connector output as expected records.
         *
         * @param path the path to the file to which the test data is to be written; may not be null
         * @return the new test specification; never null
         */
        public TestSpecification writeJsonTestData(Path path) {
            return writeJsonTestData(path.toFile());
        }

        /**
         * Create a new test specification that is a copy of this specification except with a {@link TestData} that can capture
         * the connector output as expected records.
         *
         * @param file the file to which the test data is to be written; may not be null
         * @return the new test specification; never null
         */
        public TestSpecification writeJsonTestData(File file) {
            return writeJsonTestData(() -> new FileOutputStream(file));
        }

        /**
         * Create a new test specification that is a copy of this specification except with a {@link TestData} that can capture
         * the connector output as expected records.
         *
         * @param stream the stream to which the test data is to be written; may not be null
         * @return the new test specification; never null
         */
        public TestSpecification writeJsonTestData(OutputStreamSupplier stream) {
            Function<TestSpecification, TestData> supplier = (spec) -> {
                // This data does not require anything from the test specification, so our supplier can just return our data ...
                return new TestData() {
                    private List<Document> recorded = new ArrayList<>();

                    @Override
                    public Iterator<Document> read() {
                        return null;
                    }

                    @Override
                    public void write(Document sourceRecord) {
                        recorded.add(sourceRecord);
                    }

                    @Override
                    public void close() throws IOException {
                        Iterable<?> docs = recorded;
                        Array arrayOfDocs = Array.create(docs);
                        try (OutputStream str = stream.get()) {
                            ArrayWriter.prettyWriter().write(arrayOfDocs, str); // closes the stream
                        }
                        TestData.super.close();
                    }
                };
            };
            return new TestSpecification(name, config, env, supplier, variables);
        }

        /**
         * Create a new test specification that is a copy of this specification except with the given variables.
         *
         * @param variables the new variables that should replace this specification's variables
         * @return the new test specification; never null
         */
        public TestSpecification withVariables(AvailableVariables variables) {
            return new TestSpecification(name, config, env, dataSupplier, variables);
        }

        /**
         * Create a new test specification that is a copy of this specification except with the given variables.
         *
         * @param variableSupplier the function used to obtain the variables using the configuration
         * @return the new test specification; never null
         */
        public TestSpecification withVariables(VariableSupplier variableSupplier) {
            try {
                Map<String, String> variables = variableSupplier.get(config);
                return withVariables(variables::get);
            }
            catch (Throwable t) {
                t.printStackTrace(System.err);
                fail("Unable to read variables using configuration: " + config);
                return null;
            }
        }
    }

    @FunctionalInterface
    protected static interface VariableSupplier {
        Map<String, String> get(Configuration config) throws Exception;
    }

    @FunctionalInterface
    protected static interface InputStreamSupplier {
        InputStream get() throws IOException;
    }

    @FunctionalInterface
    protected static interface OutputStreamSupplier {
        OutputStream get() throws IOException;
    }

    protected void addValueComparatorsByFieldPath(BiConsumer<String, RecordValueComparator> comparatorsByPath) {
        // add none of them by default
    }

    protected void addValueComparatorsBySchemaName(BiConsumer<String, RecordValueComparator> comparatorsByPath) {
        // add none of them by default
    }

    /**
     * Create a new test specification with the given name.
     *
     * @param name the name of the test
     * @return the test specification; never null
     */
    protected TestSpecification usingSpec(String name) {
        return new TestSpecification(name, null, null, null, null);
    }

    /**
     * Create a new test specification that uses the files in the given directory for the connector configuration, environment
     * configuration,
     * and expected results. These names of these files are as follows:
     *
     * <ul>
     * <li>{@value #DEFAULT_CONNECTOR_PROPERTIES_FILENAME} for the connector configuration file and is required;</li>
     * <li>{@value #DEFAULT_CONNECTOR_PROPERTIES_FILENAME} for the environment configuration file and is optional; and</li>
     * <li>{@value #DEFAULT_EXPECTED_RECORDS_FILENAME} for the expected results file and will be generated if missing.</li>
     * </ul>
     *
     * @param name the name of the test
     * @param directory the path to the directory that contains the test files; may not be null
     * @return the test specification; never null
     */
    protected TestSpecification usingSpec(String name, String directory) {
        String version = System.getProperty("java.version");
        String modulePath = "";
        /**
         * Java 11 seems to have issues finding the file when tests are invoked from the root directory. However, running
         * the tests directory from the module directory work, so we only need to mess with the path when we are invoking
         * the tests from the root.
         */
        if (!version.startsWith("1.")) {
            if (!Paths.get(directory).toAbsolutePath().toString().contains("debezium-embedded")) {
                modulePath = "debezium-embedded/";
            }
        }
        return usingSpec(name, Paths.get(modulePath + directory));
    }

    /**
     * Create a new test specification that uses the files in the given directory for the connector configuration, environment
     * configuration, and expected results. These names of these files are as follows:
     *
     * <ul>
     * <li>{@value #DEFAULT_CONNECTOR_PROPERTIES_FILENAME} for the connector configuration file and is required;</li>
     * <li>{@value #DEFAULT_CONNECTOR_PROPERTIES_FILENAME} for the environment configuration file and is optional; and</li>
     * <li>{@value #DEFAULT_EXPECTED_RECORDS_FILENAME} for the expected results file and will be generated if missing.</li>
     * </ul>
     *
     * @param name the name of the test
     * @param directory the path to the directory that contains the test files; may not be null
     * @return the test specification; never null
     */
    protected TestSpecification usingSpec(String name, Path directory) {
        directory = directory.toAbsolutePath();
        Path configFile = directory.resolve(DEFAULT_CONNECTOR_PROPERTIES_FILENAME);
        Path expectedRecordsFile = directory.resolve(DEFAULT_EXPECTED_RECORDS_FILENAME);
        Path envFile = directory.resolve(DEFAULT_ENV_PROPERTIES_FILENAME);
        return usingSpec(name).withConfiguration(configFile.toFile())
                .withEnvironment(envFile.toFile())
                .withReadOrWriteTestData(expectedRecordsFile);
    }

    /**
     * Create a new test specification that uses the given files for the configuration, environment, and expected results.
     *
     * @param name the name of the test or test run; may not be null
     * @param configFile the path to the configuration file; may not be null
     * @param expectedRecordsFile the path to the file where the expected records can be read or where they are to be written;
     *            may not be null
     * @param envFile the path to the file containing the environment properties; may be null if there is no such file
     * @return the test specification; never null
     */
    protected TestSpecification usingSpec(String name, String configFile, String expectedRecordsFile, String envFile) {
        return usingSpec(name, Paths.get(configFile), Paths.get(expectedRecordsFile), envFile != null ? Paths.get(envFile) : null);
    }

    /**
     * Create a new test specification that uses the given files for the configuration, environment, and expected results.
     *
     * @param name the name of the test or test run; may not be null
     * @param configFile the path to the configuration file; may not be null
     * @param expectedRecordsFile the path to the file where the expected records can be read or where they are to be written;
     *            may not be null
     * @param envFile the path to the file containing the environment properties; may be null if there is no such file
     * @return the test specification; never null
     */
    protected TestSpecification usingSpec(String name, Path configFile, Path expectedRecordsFile, Path envFile) {
        return usingSpec(name).withConfiguration(configFile.toFile())
                .withEnvironment(envFile.toFile())
                .withReadOrWriteTestData(expectedRecordsFile);
    }

    @Before
    public void cleanOffsetStorage() {
        Testing.Print.enable();
        Testing.Files.delete(CONNECTOR_OUTPUT_PATH);
        Testing.Files.delete(OFFSET_STORE_PATH);
        OFFSET_STORE_PATH.getParent().toFile().mkdirs();
    }

    @After
    public void afterEachTestMethod() {
        Testing.Print.disable();
    }

    /**
     * Return the names of the fields that should always be ignored for all tests. By default this method returns {@code null}.
     *
     * @return the array of field names that should always be ignored, or empty or null if there are no such fields
     */
    protected String[] globallyIgnorableFieldNames() {
        return null;
    }

    /**
     * Run the connector that uses the files in the given directory for the connector configuration, environment configuration,
     * and expected results. These names of these files are as follows:
     *
     * <ul>
     * <li>{@value #DEFAULT_CONNECTOR_PROPERTIES_FILENAME} for the connector configuration file and is required;</li>
     * <li>{@value #DEFAULT_CONNECTOR_PROPERTIES_FILENAME} for the environment configuration file and is optional; and</li>
     * <li>{@value #DEFAULT_EXPECTED_RECORDS_FILENAME} for the expected results file and will be generated if missing.</li>
     * </ul>
     *
     * @param testName the name of the test or test run; may not be null
     * @param directory the path to the directory that contains the test files; may not be null
     */
    protected void runConnector(String testName, String directory) {
        runConnector(usingSpec(testName, directory));
    }

    /**
     * Run the connector that uses the files in the given directory for the connector configuration, environment configuration,
     * and expected results. These names of these files are as follows:
     *
     * <ul>
     * <li>{@value #DEFAULT_CONNECTOR_PROPERTIES_FILENAME} for the connector configuration file and is required;</li>
     * <li>{@value #DEFAULT_CONNECTOR_PROPERTIES_FILENAME} for the environment configuration file and is optional; and</li>
     * <li>{@value #DEFAULT_EXPECTED_RECORDS_FILENAME} for the expected results file and will be generated if missing.</li>
     * </ul>
     *
     * @param testName the name of the test or test run; may not be null
     * @param directory the path to the directory that contains the test files; may not be null
     */
    protected void runConnector(String testName, Path directory) {
        runConnector(usingSpec(testName, directory));
    }

    /**
     * Run the connector described by the supplied test specification.
     *
     * @param testName the name of the test or test run; may not be null
     * @param configFile the path to the configuration file; may not be null
     * @param expectedRecordsFile the path to the file where the expected records can be read or where they are to be written;
     *            may not be null
     * @param envFile the path to the file containing the environment properties; may be null if there is no such file
     */
    protected void runConnector(String testName, String configFile, String expectedRecordsFile, String envFile) {
        runConnector(usingSpec(testName, configFile, expectedRecordsFile, envFile));
    }

    /**
     * Run the connector described by the supplied test specification.
     *
     * @param testName the name of the test or test run; may not be null
     * @param configFile the path to the configuration file; may not be null
     * @param expectedRecordsFile the path to the file where the expected records can be read or where they are to be written;
     *            may not be null
     * @param envFile the path to the file containing the environment properties; may be null if there is no such file
     */
    protected void runConnector(String testName, Path configFile, Path expectedRecordsFile, Path envFile) {
        runConnector(usingSpec(testName, configFile, expectedRecordsFile, envFile));
    }

    /**
     * Run the connector described by the supplied test specification.
     *
     * @param spec the test specification
     */
    protected void runConnector(TestSpecification spec) {
        runConnector(spec, null);
    }

    /**
     * Run the connector described by the supplied test specification.
     *
     * @param spec the test specification
     * @param callback the function that should be called when the connector is stopped
     */
    protected void runConnector(TestSpecification spec, CompletionCallback callback) {
        PreviousContext preRunContext = LoggingContext.forConnector(getClass().getSimpleName(), "runner", spec.name());
        final Configuration environmentConfig = Configuration.copy(spec.environment()).build();
        final Configuration connectorConfig = spec.config();
        String[] ignorableFieldNames = environmentConfig.getString(ENV_IGNORE_FIELDS, "").split(",");
        final Set<String> ignorableFields = Arrays.stream(ignorableFieldNames).map(String::trim).collect(Collectors.toSet());
        String[] globallyIgnorableFieldNames = globallyIgnorableFieldNames();
        if (globallyIgnorableFieldNames != null && globallyIgnorableFieldNames.length != 0) {
            ignorableFields.addAll(Arrays.stream(globallyIgnorableFieldNames).map(String::trim).collect(Collectors.toSet()));
        }
        final SchemaAndValueConverter keyConverter = new SchemaAndValueConverter(environmentConfig, true);
        final SchemaAndValueConverter valueConverter = new SchemaAndValueConverter(environmentConfig, false);
        final TestData testData = spec.testData();

        // Get any special comparators ...
        final Map<String, RecordValueComparator> comparatorsByFieldName = new HashMap<>();
        addValueComparatorsByFieldPath(comparatorsByFieldName::put);
        final Map<String, RecordValueComparator> comparatorsBySchemaName = new HashMap<>();
        addValueComparatorsBySchemaName(comparatorsBySchemaName::put);

        RuntimeException runError = null;
        CompletionResult problem = new CompletionResult(callback);
        try {
            // Set up the test data ...
            final PreviewIterator<Document> expectedRecords = Iterators.preview(testData.read());
            final Consumer<Document> recorder = testData::write;

            // We need something that will measure the amount of time since our consumer has seen a record ...
            TimeSince timeSinceLastRecord = Threads.timeSince(Clock.SYSTEM);

            // We'll keep the last 10 expected and actual records so that there is some context if they don't match ...
            Queue<SourceRecord> actualRecordHistory = fixedSizeQueue(10);
            Queue<SourceRecord> expectedRecordHistory = fixedSizeQueue(10);

            // Define what happens for each record ...
            ConsumerCompletion result = new ConsumerCompletion();
            Consumer<SourceRecord> consumer = (actualRecord) -> {
                PreviousContext prev = LoggingContext.forConnector(getClass().getSimpleName(), "runner", spec.name());
                try {
                    Testing.debug("actual record:    " + SchemaUtil.asString(actualRecord));
                    timeSinceLastRecord.reset();

                    // Record the actual in the history ...
                    actualRecordHistory.add(actualRecord);

                    // And possibly hand it to the test's recorder ...
                    try {
                        Document jsonRecord = serializeSourceRecord(actualRecord, keyConverter, valueConverter);
                        if (jsonRecord != null) {
                            recorder.accept(jsonRecord);
                        }
                    }
                    catch (IOException e) {
                        String msg = "Error converting JSON to SourceRecord";
                        Testing.debug(msg);
                        throw new ConnectException(msg, e);
                    }

                    if (expectedRecords != null) {
                        // Get the test's next expected record ...
                        if (!expectedRecords.hasNext()) {
                            // We received an actual record but don't have or expect one ...
                            String msg = "Source record found but nothing expected";
                            result.error();
                            Testing.debug(msg);
                            throw new MismatchRecordException(msg, actualRecordHistory, expectedRecordHistory);
                        }
                        Document expected = expectedRecords.next();
                        if (isEndCommand(expected)) {
                            result.error();
                            String msg = "Source record was found but not expected: " + SchemaUtil.asString(actualRecord);
                            Testing.debug(msg);
                            throw new MismatchRecordException(msg, actualRecordHistory, expectedRecordHistory);
                        }
                        else if (isCommand(expected)) {
                            Testing.debug("applying command: " + SchemaUtil.asString(expected));
                            applyCommand(expected, result);
                        }
                        else {
                            try {
                                // Otherwise, build a record from the expected and add it to the history ...
                                SourceRecord expectedRecord = rehydrateSourceRecord(expected, keyConverter, valueConverter);
                                expectedRecordHistory.add(expectedRecord);
                                Testing.debug("expected record:  " + SchemaUtil.asString(expectedRecord));

                                // And compare the records ...
                                try {
                                    assertSourceRecordMatch(actualRecord, expectedRecord, ignorableFields::contains,
                                            comparatorsByFieldName, comparatorsBySchemaName);
                                }
                                catch (AssertionError e) {
                                    result.error();
                                    String msg = "Source record with key " + SchemaUtil.asString(actualRecord.key())
                                            + " did not match expected record: " + e.getMessage();
                                    Testing.debug(msg);
                                    throw new MismatchRecordException(e, msg, actualRecordHistory, expectedRecordHistory);
                                }
                            }
                            catch (IOException e) {
                                result.exception();
                                String msg = "Error converting JSON to SourceRecord";
                                Testing.debug(msg);
                                throw new ConnectException(msg, e);
                            }
                        }

                        if (!expectedRecords.hasNext()) {
                            // We expect no more records, so stop the connector ...
                            result.stop();
                            String msg = "Stopping connector after no more expected records found";
                            Testing.debug(msg);
                            throw new StopConnectorException(msg);
                        }

                        // Peek at the next record to see if it is a command ...
                        Document nextExpectedRecord = expectedRecords.peek();
                        if (isCommand(nextExpectedRecord)) {
                            // consume it and apply it ...
                            applyCommand(expectedRecords.next(), result);
                        }
                    }

                }
                finally {
                    prev.restore();
                }
            };

            // Set up the configuration for the engine to include the connector configuration and apply as defaults
            // the environment and engine parameters ...
            Configuration engineConfig = Configuration.copy(connectorConfig)
                    .withDefault(environmentConfig)
                    .withDefault(EmbeddedEngine.ENGINE_NAME, spec.name())
                    .withDefault(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH)
                    .withDefault(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 0)
                    .build();

            // Create the engine ...
            EmbeddedEngine engine = EmbeddedEngine.create()
                    .using(engineConfig)
                    .notifying(consumer)
                    .using(this.getClass().getClassLoader())
                    .using(problem)
                    .build();

            long connectorTimeoutInSeconds = environmentConfig.getLong(ENV_CONNECTOR_TIMEOUT_IN_SECONDS, 10);
            // Get ready to run the connector one or more times ...
            do {
                // Each time create a thread that will stop our connector if we don't get enough results
                Thread timeoutThread = Threads.timeout(spec.name() + "-connector-output",
                        connectorTimeoutInSeconds, TimeUnit.SECONDS,
                        timeSinceLastRecord,
                        engine::stop);
                // But plan to stop our timeout thread as soon as the connector completes ...
                result.uponCompletion(timeoutThread::interrupt);
                timeoutThread.start();

                // Run the connector and block until the connector is stopped by the timeout thread or until
                // an exception is thrown within the connector (perhaps by the consumer) ...
                Testing.debug("Starting connector");
                result.reset();
                engine.run();
            } while (result.get() == ExecutionResult.RESTART_REQUESTED);
        }
        catch (IOException e) {
            runError = new RuntimeException("Error reading test data: " + e.getMessage(), e);
        }
        catch (RuntimeException t) {
            runError = t;
        }
        finally {
            // And clean up everything ...
            try {
                testData.close();
            }
            catch (IOException e) {
                if (runError != null) {
                    runError = new RuntimeException("Error closing test data: " + e.getMessage(), e);
                }
            }
            finally {
                try {
                    keyConverter.close();
                }
                finally {
                    try {
                        valueConverter.close();
                    }
                    finally {
                        preRunContext.restore();
                    }
                }
            }
        }
        if (runError != null) {
            throw runError;
        }

        if (problem.hasError()) {
            Throwable error = problem.error();
            if (error instanceof AssertionError) {
                fail(problem.message());
            }
            else if (error instanceof MismatchRecordException) {
                MismatchRecordException mismatch = (MismatchRecordException) error;
                LinkedList<SourceRecord> actualHistory = mismatch.getActualRecords();
                LinkedList<SourceRecord> expectedHistory = mismatch.getExpectedRecords();
                Testing.print("");
                Testing.print("FAILURE in connector integration test '" + spec.name() + "' in class " + getClass());
                Testing.print(" actual record:   " + SchemaUtil.asString(actualHistory.getLast()));
                Testing.print(" expected record: " + SchemaUtil.asString(expectedHistory.getLast()));
                Testing.print(mismatch.getMessage());
                Testing.print("");
                AssertionError cause = ((MismatchRecordException) error).getError();
                if (cause != null) {
                    throw cause;
                }
                fail(problem.message());
            }
            else if (error instanceof RuntimeException) {
                throw (RuntimeException) error;
            }
            else {
                throw new RuntimeException(error);
            }
        }
    }

    private void applyCommand(Document record, ConsumerCompletion result) {
        if (isCommand(record)) {
            Testing.debug("applying command: " + SchemaUtil.asString(record));
            String command = record.getString(CONTROL_KEY);
            if (CONTROL_RESTART.equalsIgnoreCase(command)) {
                // We're supposed to restart the connector, so stop it ...
                result.restartRequested();
                String msg = "Stopping connector after record as requested";
                Testing.debug(msg);
                throw new StopConnectorException(msg);
            }
            else if (CONTROL_STOP.equalsIgnoreCase(command)) {
                // We're supposed to restart the connector, so stop it ...
                result.stop();
                String msg = "Stopping connector after record as requested";
                Testing.debug(msg);
                throw new StopConnectorException(msg);
            }
        }
    }

    private boolean isCommand(Document record) {
        return record.has(CONTROL_KEY);
    }

    private boolean isEndCommand(Document record) {
        return isCommand(record) && CONTROL_END.equalsIgnoreCase(getCommand(record));
    }

    private String getCommand(Document record) {
        return record.getString(CONTROL_KEY);
    }

    private static class ConsumerCompletion {
        private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>();
        private final AtomicReference<ExecutionResult> result = new AtomicReference<>();

        public void stop() {
            setResult(ExecutionResult.STOPPED);
        }

        public void error() {
            setResult(ExecutionResult.ERROR);
        }

        public void exception() {
            setResult(ExecutionResult.EXCEPTION);
        }

        public void restartRequested() {
            setResult(ExecutionResult.RESTART_REQUESTED);
        }

        private void setResult(ExecutionResult result) {
            try {
                Runnable r = uponCompletion.getAndSet(null);
                if (r != null) {
                    r.run();
                }
            }
            finally {
                this.result.compareAndSet(null, result);
            }
        }

        public void uponCompletion(Runnable r) {
            uponCompletion.set(r);
        }

        public ExecutionResult get() {
            return this.result.get();
        }

        public void reset() {
            this.result.set(null);
        }
    }

    private SourceRecord rehydrateSourceRecord(Document record, SchemaAndValueConverter keyConverter,
                                               SchemaAndValueConverter valueConverter)
            throws IOException {
        Document sourcePartitionDoc = record.getDocument("sourcePartition");
        Document sourceOffsetDoc = record.getDocument("sourceOffset");
        String topic = record.getString("topic");
        Integer kafkaPartition = record.getInteger("kafkaPartition");
        Document keySchema = record.getDocument("keySchema");
        Document valueSchema = record.getDocument("valueSchema");
        Document key = record.getDocument("key");
        Document value = record.getDocument("value");
        Document keyAndSchemaDoc = Document.create("schema", keySchema, "payload", key);
        Document valueAndSchemaDoc = Document.create("schema", valueSchema, "payload", value);
        SchemaAndValue keyWithSchema = keyConverter.deserialize(topic, keyAndSchemaDoc);
        SchemaAndValue valueWithSchema = valueConverter.deserialize(topic, valueAndSchemaDoc);
        Map<String, ?> sourcePartition = toMap(sourcePartitionDoc);
        Map<String, ?> sourceOffset = toMap(sourceOffsetDoc);
        return new SourceRecord(sourcePartition, sourceOffset, topic, kafkaPartition,
                keyWithSchema.schema(), keyWithSchema.value(),
                valueWithSchema.schema(), valueWithSchema.value());
    }

    /**
     * Serialize the source record to document form.
     *
     * @param record the record; may not be null
     * @param keyConverter the converter for the record key's schema and payload
     * @param valueConverter the converter for the record value's schema and payload
     * @return the document form of the source record; never null
     * @throws IOException if there is an error converting the key or value
     */
    private Document serializeSourceRecord(SourceRecord record, SchemaAndValueConverter keyConverter,
                                           SchemaAndValueConverter valueConverter)
            throws IOException {
        Document keyAndSchema = keyConverter.serialize(record.topic(), record.keySchema(), record.key());
        Document valueAndSchema = valueConverter.serialize(record.topic(), record.valueSchema(), record.value());
        Document sourcePartition = Document.create().putAll(record.sourcePartition());
        Document sourceOffset = Document.create().putAll(record.sourceOffset());
        Document parent = Document.create();
        parent.set("sourcePartition", sourcePartition);
        parent.set("sourceOffset", sourceOffset);
        parent.set("topic", record.topic());
        parent.set("kafkaPartition", record.kafkaPartition());
        parent.set("keySchema", keyAndSchema.getDocument("schema"));
        parent.set("key", keyAndSchema.getDocument("payload"));
        parent.set("valueSchema", valueAndSchema.getDocument("schema"));
        parent.set("value", valueAndSchema.getDocument("payload"));
        return parent;
    }

    private void assertSourceRecordMatch(SourceRecord actual, SourceRecord expected, Predicate<String> ignoreFields,
                                         Map<String, RecordValueComparator> comparatorsByName,
                                         Map<String, RecordValueComparator> comparatorsBySchemaName) {
        try {
            VerifyRecord.isValid(actual);
        }
        catch (AssertionError e) {
            throw new AssertionError("Actual source record is not valid: " + e.getMessage());
        }
        try {
            VerifyRecord.isValid(expected);
        }
        catch (AssertionError e) {
            throw new AssertionError("Expected source record is not valid: " + e.getMessage());
        }
        VerifyRecord.assertEquals(actual, expected, ignoreFields, comparatorsByName, comparatorsBySchemaName);
    }

    private static class SchemaAndValueConverter implements AutoCloseable {
        private final JsonConverter jsonConverter = new JsonConverter();
        private final JsonSerializer jsonSerializer = new JsonSerializer();
        private final JsonDeserializer jsonDeserializer = new JsonDeserializer();
        private final ObjectMapper mapper = new ObjectMapper();
        private final DocumentReader jsonReader = DocumentReader.defaultReader();

        public SchemaAndValueConverter(Configuration config, boolean isKey) {
            jsonConverter.configure(config.asMap(), isKey);
            jsonSerializer.configure(config.asMap(), isKey);
            jsonDeserializer.configure(config.asMap(), isKey);
        }

        public SchemaAndValue deserialize(String topic, Document doc) throws IOException {
            String jsonString = doc.toString();
            JsonNode jsonNode = mapper.readTree(jsonString);
            byte[] rawValue = jsonSerializer.serialize(topic, jsonNode);
            return jsonConverter.toConnectData(topic, rawValue);
        }

        public Document serialize(String topic, Schema schema, Object value) throws IOException {
            byte[] rawBytes = jsonConverter.fromConnectData(topic, schema, value);
            JsonNode jsonNode = jsonDeserializer.deserialize(topic, rawBytes);
            String jsonStr = mapper.writeValueAsString(jsonNode);
            return jsonReader.read(jsonStr);
        }

        @Override
        public void close() {
            try {
                jsonSerializer.close();
            }
            finally {
                jsonDeserializer.close();
            }
        }
    }

    private Map<String, ?> toMap(Document doc) {
        Map<String, Object> result = new HashMap<>();
        doc.forEach(field -> {
            result.put(field.getName().toString(), field.getValue().asObject());
        });
        return result;
    }

    private <T> Queue<T> fixedSizeQueue(int size) {
        return new LinkedList<T>() {
            private static final long serialVersionUID = 1L;

            @Override
            public boolean add(T o) {
                super.add(o);
                while (size() > size) {
                    super.remove();
                }
                return true;
            }
        };
    }

    /**
     * Read the contents of the supplied {@link InputStream}, replace all variables found in the content, and write the
     * result to a temporary file.
     *
     * @param stream the input stream containing zero or more {@link Strings#replaceVariables(String, java.util.function.Function)
     *            variable expressions}
     * @param variables the variables
     * @return the temporary file that exists in the data directory
     * @throws IOException if there is a problem reading the input stream or writing to the temporary file
     */
    protected static File replaceVariables(InputStream stream, AvailableVariables variables) throws IOException {
        File tmpFile = Testing.Files.createTestingFile();
        try (OutputStream ostream = new FileOutputStream(tmpFile)) {
            IoUtil.readLines(stream, (line) -> {
                String newLine = Strings.replaceVariables(line, variables::variableForName);
                try {
                    ostream.write(newLine.getBytes(StandardCharsets.UTF_8));
                }
                catch (IOException e) {
                    throw new RuntimeException("Error writing to file '" + tmpFile + "'", e);
                }
            }, StandardCharsets.UTF_8);
        }
        return tmpFile;
    }
}
